package org.zjvis.datascience.service.cleanup;

import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Joiner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.zjvis.datascience.common.constant.Constant;
import org.zjvis.datascience.common.dto.RecommendDTO;
import org.zjvis.datascience.common.widget.dto.WidgetDTO;
import org.zjvis.datascience.common.util.CollectionUtil;
import org.zjvis.datascience.common.util.TimeUtil;
import org.zjvis.datascience.common.util.db.JDBCUtil;
import org.zjvis.datascience.service.RecommendService;
import org.zjvis.datascience.service.TaskInstanceService;
import org.zjvis.datascience.service.WidgetService;
import org.zjvis.datascience.service.dataprovider.GPDataProvider;

import javax.servlet.ServletContext;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;

import static org.zjvis.datascience.common.constant.DatabaseConstant.DEFAULT_DATASET_ID;

/**
 * @description 清洗操作 清除历史结果表调度器
 * @date 2021-12-01
 */
@Service
public class CleanUpTemporaryTableScheduler {

    /**
     * 保留最新2次运行的临时表
     */
    public static final Integer RETAIN_NUM = 2;

    private static String limitDate;
    /**
     * 当前时间减48小时,查询前两天的临时表
     */
    private static final int DECREMENT_HOUR = -48;

    /**
     * 当前时间减72小时,查询前三天的记录
     */
    private static final int EXPAND_HOUR = -72;

    @Autowired
    private TaskInstanceService taskInstanceService;

    @Autowired
    private GPDataProvider gpDataProvider;

    @Autowired
    private CleanUpThreadPool cleanUpThreadPool;

    @Autowired
    private RecommendService recommendService;

    @Autowired
    private WidgetService widgetService;

    @Autowired
    private ServletContext servletContext;

    private final static Logger logger = LoggerFactory.getLogger("CleanUpTemporaryTableScheduler");

    @Scheduled(cron = "0 0 3 * * ?")
    public void cleanUpTemporaryTable() {
        logger.warn("CleanUpTemporaryTableScheduler.cleanUpTemporaryTable() start time -> " + System
                .currentTimeMillis());
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.HOUR, DECREMENT_HOUR);
        limitDate = TimeUtil.formatDate(new Date(calendar.getTimeInMillis()));

        //查询出前两天临时表超过2个的taskId
        List<Long> pendingTaskIds = taskInstanceService
                .queryTaskIdHaveRedundantTable(RETAIN_NUM, limitDate);

        ThreadPoolExecutor pool = cleanUpThreadPool.getExecutor();
        if (!ObjectUtil.isEmpty(pendingTaskIds)) {
            for (Long taskId : pendingTaskIds) {
                pool.submit( CleanUpRunner.init(taskId, taskInstanceService));
            }
        }
    }

    /**
     * 项目删除时，会出现死锁现象， 主要原因instance表数据太多 全表扫描进行删除，锁住了部分行
     */
//    @Scheduled(cron = "0 0 3 * * ?")
    public void cleanUpDeletedProjectRecords() {
//        logger.warn("CleanUpTemporaryTableScheduler.cleanUpTemporaryTable() start time -> " + System
//                .currentTimeMillis());
//        Calendar calendar = Calendar.getInstance();
//        calendar.add(Calendar.HOUR, DECREMENT_HOUR);
//        limitDate = TimeUtil.formatDate(new Date(calendar.getTimeInMillis()));
//
//        //查询出前两天临时表超过2个的taskId
//        List<Long> pendingTaskIds = taskInstanceService
//                .queryTaskIdHaveRedundantTable(RETAIN_NUM, limitDate);
//
//        ThreadPoolExecutor pool = cleanUpThreadPool.getExecutor();
//        if (!ObjectUtil.isEmpty(pendingTaskIds)) {
//            for (Long taskId : pendingTaskIds) {
//                pool.submit(new CleanUpRunner(taskId, taskInstanceService, gpDataProvider));
//            }
//        }
    }


    @Scheduled(cron = "0 0 23 * * ?")
    public void cleanUpCleanHistory() {
        logger.warn("CleanUpTemporaryTableScheduler.cleanUpCleanHistory() start time -> " + System
                .currentTimeMillis());
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.HOUR, EXPAND_HOUR);
        limitDate = TimeUtil.formatDate(new Date(calendar.getTimeInMillis()));

        //查询出前三天之前清洗操作留下的sessionId不为空的记录 返回taskId 用作待选，这些表和记录并不一定都需要删除
        List<Long> pendingTaskIds = taskInstanceService.queryRedundantTableInCleanNode(limitDate);
        logger.info("cleanUpCleanHistory -> pendingTaskIds: {}", pendingTaskIds);

        if (!ObjectUtil.isEmpty(pendingTaskIds)) {
            for (Long taskId : pendingTaskIds) {
                taskInstanceService.deleteHistoryActions(taskId);
                logger.info(String.format("cleanUpCleanHistory cleanup taskId=%s historyRecords finish. ", taskId));
            }
        }
    }

    @Scheduled(cron = "0 0 2 * * ?")
    public void cleanUpRecommendTables() {
        logger.warn("CleanUpTemporaryTableScheduler.cleanUpRecommendTables() start time -> " + System
                .currentTimeMillis());
        //删除的时间范围是前天。因为给用户生成了推荐视图,他可能没有立刻去使用,所以保留一天的临时表。
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.DAY_OF_WEEK, -1);
        String endDate = TimeUtil.formatDate(new Date(calendar.getTimeInMillis()));
        calendar.add(Calendar.DAY_OF_WEEK, -1);
        String startDate = TimeUtil.formatDate(new Date(calendar.getTimeInMillis()));
        //查出前天生成的智能推荐临时表
        List<RecommendDTO> recommendDTOList = recommendService.queryByModify(startDate, endDate);
        if (ObjectUtil.isEmpty(recommendDTOList)) {
            return;
        }
        //查出依赖推荐表的可视化视图
        List<WidgetDTO> widgets = widgetService.queryTidByModify(startDate);
        ExpandBitSet expandBitSet = new ExpandBitSet();
        if (ObjectUtil.isNotEmpty(widgets)) {
            for (WidgetDTO widget : widgets) {
                if ("recommend".equals(widget.getType())) {
                    expandBitSet.set(widget.getTid());
                } else {
                    JSONObject dataJson = JSONObject.parseObject(widget.getDataJson());
                    JSONObject formData = dataJson.getJSONObject("formData");
                    if ("recommend".equals(formData.getString("dataType"))) {
                        expandBitSet.set(formData.getLongValue("dataId"));
                    }
                }
            }
        }
        //剔除不能删的表
        List<String> tables = new ArrayList<>(recommendDTOList.size());
        for (RecommendDTO recommend : recommendDTOList) {
            if (!expandBitSet.get(recommend.getId())) {
                tables.add(recommend.getTableNames());
            }
        }
        //16个一批删除
        for (int i = 0; i < tables.size(); i = i + 16) {
            List<String> subTables = tables.subList(i, Math.min(i + 16, tables.size()));
            cleanUpThreadPool.getExecutor()
                    .submit(CleanUpRecommendTablesRunner.init(Joiner.on(",").join(subTables)));
        }
    }

    /**
     * 删除由greenplum-spark-connector创建的外部表
     */
    @Scheduled(cron = "0 0 1 * * ?")
    public void cleanUpSparkTables() {
        logger.warn("CleanUpTemporaryTableScheduler.cleanUpSparkTables() start time -> " + System
                .currentTimeMillis());
        List<String> tables = gpDataProvider.querySparkTables();
        if (CollectionUtil.isEmpty(tables)) {
            logger.info("no table found");
            return;
        }
        logger.info("cleanUpSparkTables(), the number of tables queried is " + tables.size());
        Connection conn = null;
        Statement st = null;
        try {
            conn = gpDataProvider.getConn(DEFAULT_DATASET_ID);
            conn.setAutoCommit(false);
            st = conn.createStatement();
            for (int i = 0; i < tables.size(); i++) {
                st.addBatch(tables.get(i));
                if (i % 10000 == 0) {
                    st.executeBatch();
                    conn.commit();
                }
            }

            st.executeBatch();
            conn.commit();
//            cleanUpAssetService.dropDataPatterns(dataSourceKey, tables);
        } catch (Exception e) {
            logger.error("CleanUpTemporaryTableScheduler.cleanUpSparkTables() error, message = {}", e.getMessage());
        }
        finally {
            JDBCUtil.close(conn, st, null);
        }
    }

    public static String getLimitDate() {
        return limitDate;
    }

}
